Solutions/ZoomReports/Data Connectors/ZoomSentinelConnector/__init__.py (299 lines of code) (raw):
import azure.functions as func
import datetime
import json
import base64
import hashlib
import hmac
import requests
import re
import os
import logging
import time
from .state_manager import StateManager
zoom_account_id = os.environ['AccountID']
zoom_client_id = os.environ['ClientID']
zoom_client_secret = os.environ['ClientSecret']
customer_id = os.environ['WorkspaceID']
shared_key = os.environ['WorkspaceKey']
connection_string = os.environ['AzureWebJobsStorage']
logAnalyticsUri = os.environ.get('logAnalyticsUri')
table_name = "Zoom"
chunksize = 10000
retry = 3 ## To do : need to move function app configuration
error=False
#Max script execution
SCRIPT_EXECUTION_INTERVAL_MINUTES = os.environ.get('EXECUTION_INTERVAL_MINUTES')
#Azure function max execution
AZURE_FUNC_MAX_EXECUTION_TIME_MINUTES = os.environ.get('MAX_EXECUTION_TIME_MINUTES')
##Default values
Default_Values = {
"SCRIPT_EXECUTION_INTERVAL_MINUTES": 30,
"AZURE_FUNC_MAX_EXECUTION_TIME_MINUTES": 29,
}
##Original Values
Orginal_Values={
"SCRIPT_EXECUTION_INTERVAL_MINUTES":SCRIPT_EXECUTION_INTERVAL_MINUTES,
"AZURE_FUNC_MAX_EXECUTION_TIME_MINUTES":AZURE_FUNC_MAX_EXECUTION_TIME_MINUTES
}
def is_number_regex(sn):
""" Returns True if string is a number and False for other types. """
if re.match("^\d+?\.\d+?$", sn) is None:
return sn.isdigit()
else:
return False
##This code checks for script execution time and azure func max interval mins for null,int,float and assign based on validations and need to validate the varables
def validate_varable(Var_value,var):
""" Returns True if string is a number and False for other data types. """
if ((Var_value in(None,'') or str(Var_value).isspace())):
temp_var=Default_Values.get(var)
globals()[var]=int(temp_var)
else:
logging.info("{}: {}".format(var,Var_value))
if(is_number_regex(Var_value)):
globals()[var]=int(Var_value)
else:
tempmsg="Please enter correct value for {}".format(var)
raise Exception(tempmsg)
def paramter_validation():
""" Validates the paramters for the inputs with default values by comparing with orginal values. """
for key,val in Orginal_Values.items():
validate_varable(val,key)
##This method is used for parameter validation
paramter_validation()
if ((logAnalyticsUri in (None, '') or str(logAnalyticsUri).isspace())):
logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com'
pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$'
match = re.match(pattern, str(logAnalyticsUri))
if (not match):
raise Exception("Zoom: Invalid Log Analytics Uri.")
class Zoom:
"""This class has methods of zoom api report generation and oauth token generation
"""
def __init__(self):
"""This is the method where declares all the self initiating varables
Raises:
Exception: Raises the exception when access token object is none
"""
self.account_id = zoom_account_id
self.client_id = zoom_client_id
self.client_secret = zoom_client_secret
self.retry = retry
self.error_statuses = [429, 500, 502, 503, 504]
self.base_url = "https://api.zoom.us/v2"
self.token_url = "https://zoom.us/oauth/token?grant_type=account_credentials&account_id="+zoom_account_id
self.oauth_token = self.generate_oauth_token()
self.from_day, self.to_day = self.generate_date()
if self.oauth_token is not None:
self.headers = {
'Accept': 'application/json',
'authorization': "Bearer " + self.oauth_token,
}
else:
raise Exception("Unable to generate access token")
def generate_oauth_token(self):
"""This method used to generate the oauth zoom token
Returns:
_type_: the zoom oauth token
"""
error=False
for _ in range(self.retry):
try:
base64String = base64.b64encode(
f"{self.client_id}:{self.client_secret}".encode('utf-8')).decode("ascii")
headersfortokens = {
'Accept': 'application/json',
'authorization': "Basic " + base64String,
}
query_params = {
"grant_type": "account_credentials",
"account_id": self.account_id,
}
oauth_token = requests.post(url=self.token_url,
params=query_params,
headers=headersfortokens)
if (oauth_token.status_code in self.error_statuses) or (error==True):
error=False
continue ## To Do: Need to add delay
if oauth_token.status_code == 200:
jsonData = json.loads(oauth_token.text)
auth_token = jsonData['access_token']
return auth_token
elif oauth_token.status_code == 400:
logging.error("The requested report cannot be generated for this account because"
" this account has not subscribed to toll-free audio conference plan."
" Error code: {}".format(oauth_token.status_code))
elif oauth_token.status_code == 401:
logging.error("Invalid access token. Error code: {}".format(
oauth_token.status_code))
elif oauth_token.status_code == 300:
logging.error("Only provide report in recent 6 months. Error code: {}".format(
oauth_token.status_code))
except Exception as err:
error=True
logging.error("Something wrong. Exception error text: {}".format(err))
def generate_date(self):
"""This method is used to generate date and stores in file share state
Returns:
_type_: past time and current time
"""
current_time_day = datetime.datetime.utcnow().replace(second=0, microsecond=0)
state = StateManager(connection_string)
past_time = state.get()
if past_time is not None:
logging.info("The last time point is: {}".format(past_time))
else:
logging.info(
"There is no last time point, trying to get events for last week.")
past_time = (current_time_day -
datetime.timedelta(days=7)).strftime("%Y-%m-%d")
past_time_datetime = datetime.datetime.strptime(past_time, '%Y-%m-%d')
no_days=(current_time_day-past_time_datetime)
if(no_days.days>7):
current_time_day= (past_time_datetime +datetime.timedelta(days=7))
logging.info("The current time point is: {}".format(current_time_day))
state.post(current_time_day.strftime("%Y-%m-%d"))
return (past_time, current_time_day.strftime("%Y-%m-%d"))
def get_report(self, report_type_suffix, next_page_token=None):
"""This is method is used to get report from zoom api
Args:
report_type_suffix (_type_): zoom report types
next_page_token (_type_, optional): _description_. Defaults to None.
Returns:
_type_: response from zoom report
"""
query_params = {
"page_size": 300,
"from": self.from_day,
"to": self.to_day
}
if next_page_token:
query_params.update({"next_page_token": next_page_token})
error=False
for _ in range(self.retry):
try:
r = requests.get(url=self.base_url + report_type_suffix,
params=query_params,
headers=self.headers)
if (r.status_code in self.error_statuses) or (error==True):
error=False
continue ## To Do: Need to add delay
if r.status_code == 200:
return r.json()
elif r.status_code == 400:
logging.error("The requested report cannot be generated for this account because"
" this account has not subscribed to toll-free audio conference plan."
" Error code: {}".format(r.status_code))
elif r.status_code == 401:
logging.error(
"Invalid access token. Error code: {}".format(r.status_code))
elif r.status_code == 300:
logging.error("Only provide report in recent 6 months. Error code: {}".format(
r.status_code))
else:
logging.error(
"Something wrong. Error code: {}".format(r.status_code))
except Exception as err:
error=True
logging.error("Something wrong. Exception error text: {}".format(err))
class Sentinel:
"""This class have mentods to initate data,post data to log analytics
"""
def __init__(self):
"""This is used to initiate the varaibles
"""
self.logAnalyticsUri = logAnalyticsUri
self.success_processed = 0
self.fail_processed = 0
self.table_name = table_name
self.chunksize = chunksize
def gen_chunks_to_object(self, data, chunksize=100):
"""This is used to generate chunks to object based on chunk size
Args:
data (_type_): data from zoom reports api
chunksize (int, optional): _description_. Defaults to 100.
Yields:
_type_: the chunk
"""
chunk = []
for index, line in enumerate(data):
if (index % chunksize == 0 and index > 0):
yield chunk
del chunk[:]
chunk.append(line)
yield chunk
def gen_chunks(self, data):
"""This method is used to get the chunks and post the data to log analytics work space
Args:
data (_type_): _description_
"""
for chunk in self.gen_chunks_to_object(data, chunksize=self.chunksize):
obj_array = []
for row in chunk:
if row != None and row != '':
obj_array.append(row)
body = json.dumps(obj_array)
self.post_data(body, len(obj_array))
def build_signature(self, date, content_length, method, content_type, resource):
"""This method is used to build signature for log analytics work space
Args:
date (_type_): RFC format date
content_length (_type_): length of the data from zoom api
method (_type_): This is post call
content_type (_type_): application/json
resource (_type_): api/logs
Returns:
_type_: authorized signature for log analytics work space
"""
x_headers = 'x-ms-date:' + date
string_to_hash = method + "\n" + \
str(content_length) + "\n" + content_type + \
"\n" + x_headers + "\n" + resource
bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(
hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
authorization = "SharedKey {}:{}".format(customer_id, encoded_hash)
return authorization
def post_data(self, body, chunk_count):
"""This method is used to post the data to log analytics work space
Args:
body (_type_): content from zoom report api as chunks
chunk_count (_type_): chunk count
"""
method = 'POST'
content_type = 'application/json'
resource = '/api/logs'
rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
content_length = len(body)
signature = self.build_signature(rfc1123date, content_length, method, content_type,
resource)
uri = self.logAnalyticsUri + resource + '?api-version=2016-04-01'
headers = {
'content-type': content_type,
'Authorization': signature,
'Log-Type': self.table_name,
'x-ms-date': rfc1123date
}
response = requests.post(uri, data=body, headers=headers)
if (response.status_code >= 200 and response.status_code <= 299):
logging.info("Chunk was processed({} events)".format(chunk_count))
self.success_processed = self.success_processed + chunk_count
else:
logging.error("Error during sending events to Microsoft Sentinel. Response code:{}".format(
response.status_code))
self.fail_processed = self.fail_processed + chunk_count
def results_array_join(result_element, api_req_id, api_req_name):
"""This method is used to join all the results from zoom api
Args:
result_element (_type_): This will have result element
api_req_id (_type_): zoom api req id
api_req_name (_type_): zoom api req name
"""
for element in result_element[api_req_id]:
element['event_type'] = api_req_id
element['event_name'] = api_req_name
results_array.append(element)
def check_if_functiontime_is_over(start_time, interval_minutes, max_script_exec_time_minutes):
"""Returns True if function's execution time is less than interval between function executions and
less than max azure func lifetime. In other case returns False."""
logging.info("started Max function time check")
logging.info("interval_minutes({} time)".format(interval_minutes))
logging.info("max_script_exec_time_minutes({} time)".format(max_script_exec_time_minutes))
min_minutes = min(interval_minutes, max_script_exec_time_minutes)
if min_minutes > 1:
max_time = min_minutes * 60 - 30
else:
raise Exception("Script execution mins is less than 1 min")
logging.info("max time({} time)".format(max_time))
script_execution_time = time.time() - start_time
logging.info("script execution time({} time)".format(script_execution_time))
if script_execution_time > max_time:
return True
else:
return False
def get_main_info(start_time):
"""This is the main method to get response from zoom reports api
"""
for api_req_id, api_req_info in reports_api_requests_dict.items():
api_req = api_req_info['api_req']
api_req_name = api_req_info['name']
logging.info("Getting report: {}".format(api_req_info['name']))
result = zoom.get_report(report_type_suffix=api_req)
if result is not None:
next_page_token = result.get('next_page_token')
results_array_join(result, api_req_id, api_req_name)
else:
next_page_token = None
while next_page_token:
result = zoom.get_report(
report_type_suffix=api_req, next_page_token=next_page_token)
if result is not None:
next_page_token = result.get('next_page_token')
results_array_join(result, api_req_id, api_req_name)
else:
next_page_token = None
if check_if_functiontime_is_over(start_time, SCRIPT_EXECUTION_INTERVAL_MINUTES, AZURE_FUNC_MAX_EXECUTION_TIME_MINUTES):
logging.info('Stopping script because time for execution is over')
break
def main(mytimer: func.TimerRequest) -> None:
"""This is the main method for starting the zoom functionality
Args:
mytimer (func.TimerRequest): Timer based function app
"""
start_time = time.time()
utc_timestamp = datetime.datetime.utcnow().replace(
tzinfo=datetime.timezone.utc).isoformat()
if mytimer.past_due:
logging.info('The timer is past due!')
logging.info('Python timer trigger function ran at %s', utc_timestamp)
logging.info('Starting program')
global results_array, reports_api_requests_dict, zoom
reports_api_requests_dict = \
{
"dates": {"api_req": "/report/daily", "name": "Daily Usage Reports."},
"users": {"api_req": "/report/users", "name": "Active/Inactive Host Reports."},
"telephony_usage": {"api_req": "/report/telephone", "name": "Telephone Reports."},
"cloud_recording_storage": {"api_req": "/report/cloud_recording", "name": "Cloud Recording Usage Reports."},
"operation_logs": {"api_req": "/report/operationlogs", "name": "Operation Logs Report."},
"activity_logs": {"api_req": "/report/activities", "name": "Sign In/Sign Out Activity Report."}
}
results_array = []
zoom = Zoom()
sentinel = Sentinel()
zoom_class_vars = vars(zoom)
from_day, to_day = zoom_class_vars['from_day'], zoom_class_vars['to_day']
logging.info(
'Trying to get events for period: {} - {}'.format(from_day, to_day))
get_main_info(start_time)
sentinel.gen_chunks(results_array)
sentinel_class_vars = vars(sentinel)
success_processed, fail_processed = sentinel_class_vars["success_processed"],\
sentinel_class_vars["fail_processed"]
logging.info('Total events processed successfully: {}, failed: {}. Period: {} - {}'
.format(success_processed, fail_processed, from_day, to_day))